package cn.gupao.service;

import cn.gupao.dao.HistoryEventDao;
import cn.gupao.dao.HistoryEventStateDaoImpl;
import cn.gupao.dao.ProfileDao;
import cn.gupao.pojo.CombineCondition;
import cn.gupao.pojo.EventStateBean;
import cn.gupao.pojo.LogBean;
import cn.gupao.pojo.RuleCondition;
import cn.gupao.utils.EventComparator;
import cn.gupao.utils.MatchUtils;
import cn.gupao.utils.TimeBoundUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.java.utils.ParameterTool;

import java.util.List;

@Slf4j
public class QueryService {

    private ProfileDao profileDao;
    private HistoryEventDao eventStateDao;
    private HistoryEventDao historyEventDao;

    public void init(ParameterTool parameterTool, ListState<EventStateBean> eventsState) throws Exception {
        String profileDaoClazz = parameterTool.getRequired("profile.dao.impl");
        //通过反射创建实例
        profileDao = (ProfileDao) Class.forName(profileDaoClazz).newInstance();
        profileDao.init(parameterTool);


        //创建查询状态的DAO（Data Access Object）
        eventStateDao = new HistoryEventStateDaoImpl(eventsState);

        String historyEventDaoClazz = parameterTool.getRequired("history.event.dao.impl");
        //通过反射创建实例
        historyEventDao = (HistoryEventDao) Class.forName(historyEventDaoClazz).newInstance();
        historyEventDao.init(parameterTool);
        //profileDao = new ProfileHbaseDaoImpl(parameterTool);
        //historyEventDao = new HistoryEventClickHouseDaoImpl(parameterTool);

    }

    public void destroy() throws Exception {
        profileDao.close();
    }


    public boolean isMatchTriggerEvent(LogBean bean, RuleCondition ruleCondition) throws Exception {

        //1.匹配当前行为数据
        return EventComparator.isMatchTriggerEvent(bean, ruleCondition.getTriggerEvent());

    }


    public boolean isMatchProfile(LogBean bean, RuleCondition ruleCondition) throws Exception {

        return profileDao.isMatchProfile(bean, ruleCondition.getProfileCondition());
    }


    public boolean isMatchHistoryEvent(LogBean bean, RuleCondition ruleCondition) throws Exception {

       return isMatchHistoryEvent(bean, ruleCondition.getCombineConditions());
    }


    public boolean isMatch(LogBean bean, RuleCondition ruleCondition) throws Exception {

        //1.匹配当前行为数据
        boolean isMatchTriggerEvent = EventComparator.isMatchTriggerEvent(bean, ruleCondition.getTriggerEvent());
        if (!isMatchTriggerEvent) {
            return false;
        }


        //2.匹配用户画像{tag2 = v1, tag20 = v2}
        boolean isMatchProfile = profileDao.isMatchProfile(bean, ruleCondition.getProfileCondition());
        if (!isMatchProfile) {
            return false;
        }

        boolean isMatchHistoryEvent = isMatchHistoryEvent(bean, ruleCondition.getCombineConditions());
        if (!isMatchHistoryEvent) return false;

        return true;
    }


    private boolean isMatchHistoryEvent(LogBean bean, List<CombineCondition> combineConditions) throws Exception {

        for (CombineCondition combineCondition : combineConditions) {

            //根据数据中携带的timeStamp计算查询时间边界
            long timeBound = TimeBoundUtil.getTimeBound(bean.getTimeStamp());
            //事先配置的规则的起始时间、结束时间
            long startTime = combineCondition.getStartTime();
            long endTime = combineCondition.getEndTime();
            //实现配置好的正则表达式例如(1)就是行为次数，如果是(.*?1.*?2.*?3)就是行为序列
            String regex = combineCondition.getRegex();
            //实现设置好的次数条件取出来
            Integer minLimit = combineCondition.getMinLimit();
            //根据时间和条件进行查询
            if (startTime >= timeBound) {
                //到状态中查询
                String sequenceStr = eventStateDao.queryEventSequenceStr(bean, combineCondition);
                int count = MatchUtils.matchCount(sequenceStr, regex);
                if (count < minLimit) return false;
            } else if (endTime <= timeBound) {
                //到clickHouse中查询
                String sequenceStr = historyEventDao.queryEventSequenceStr(bean, combineCondition);
                int count = MatchUtils.matchCount(sequenceStr, regex);
                if (count < minLimit) return false;
            } else {
                //（跨时间边界的）先要到状态中查询，如果状态中满足，直接返回
                long ruleStartTime = combineCondition.getStartTime();
                combineCondition.setStartTime(timeBound);
                String sequenceStr1 = eventStateDao.queryEventSequenceStr(bean, combineCondition);
                combineCondition.setStartTime(ruleStartTime);
                int count = MatchUtils.matchCount(sequenceStr1, regex);
                if (count < minLimit) {
                    long ruleEndTime = combineCondition.getEndTime();
                    combineCondition.setEndTime(timeBound);
                    String sequenceStr2 = historyEventDao.queryEventSequenceStr(bean, combineCondition);
                    combineCondition.setEndTime(ruleEndTime);
                    String resSeq = sequenceStr2 + sequenceStr1;
                    int count2 = MatchUtils.matchCount(resSeq, regex);
                    if (count2 < minLimit) return false;
                }
            }

        }
        return true;
    }

    //按照时间触发的规则
    public boolean isMatchTimerCondition(String deviceId, long timestamp, CombineCondition combineCondition) throws Exception {

        LogBean bean = new LogBean();
        bean.setDeviceId(deviceId);

        //在一段时间范围内，没做过某个事件，或者没完成某个序列（或者数没有达到指定的次数）
        //根据数据中携带的timeStamp计算查询时间边界
        long timeBound = TimeBoundUtil.getTimeBound(timestamp);
        //事先配置的规则的起始时间、结束时间
        long startTime = combineCondition.getStartTime();
        long endTime = combineCondition.getEndTime();
        //实现配置好的正则表达式例如(1)就是行为次数，如果是(.*?1.*?2.*?3)就是行为序列
        String regex = combineCondition.getRegex();
        //实现设置好的次数条件取出来
        Integer minLimit = combineCondition.getMinLimit();

        //根据时间和条件进行查询
        if (startTime >= timeBound) {
            //到状态中查询
            String sequenceStr = eventStateDao.queryEventSequenceStr(bean, combineCondition);
            int count = MatchUtils.matchCount(sequenceStr, regex);
            if (count < minLimit) return true;
        }
        if (startTime < timeBound && endTime > timeBound) {
            //（跨时间边界的）先要到状态中查询，如果状态中满足，直接返回
            long ruleStartTime = combineCondition.getStartTime();
            combineCondition.setStartTime(timeBound);
            String sequenceStr1 = eventStateDao.queryEventSequenceStr(bean, combineCondition);
            combineCondition.setStartTime(ruleStartTime);
            int count = MatchUtils.matchCount(sequenceStr1, regex);
            if (count < minLimit) {
                long ruleEndTime = combineCondition.getEndTime();
                combineCondition.setEndTime(timeBound);
                String sequenceStr2 = historyEventDao.queryEventSequenceStr(bean, combineCondition);
                combineCondition.setEndTime(ruleEndTime);
                String resSeq = sequenceStr2 + sequenceStr1;
                int count2 = MatchUtils.matchCount(resSeq, regex);
                if (count2 < minLimit) return true;
            }
        }
        return false;
    }
}
